{#
  Template for 'response_metrics.h'.

  Generates the response metric names from Kafka message types.
  The metrics structure (KAFKA_RESPONSE_METRICS) is wrapped by RichResponseMetrics instance,
  allowing for easier access to metrics using message's api_key.

  There are two metrics for each of response types (e.g. produce):
  - number of responses received,
  - response processing time in milliseconds (time between receiving a request and receiving a
    response with the same correlation id).
  There is also a metric for counting responses that could not be recognised, and one for responses
  that could caused deserialization errors.
#}

#pragma once

#include <array>
#include <functional>

#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {

/**
 * Generated metrics, we have a counter and a histogram for each request type.
 */
#define KAFKA_RESPONSE_METRICS(COUNTER, HISTOGRAM)                                                 \
{% for message_type in message_types %}                                                            \
  COUNTER({{ message_type.name_in_c_case() }})                                                     \
  HISTOGRAM({{ message_type.name_in_c_case() }}_duration, Milliseconds)                            \
{% endfor %}                                                                                       \
  COUNTER(unknown)                                                                                 \
  COUNTER(failure)

struct KafkaResponseMetrics {
  KAFKA_RESPONSE_METRICS(GENERATE_COUNTER_STRUCT, GENERATE_HISTOGRAM_STRUCT)
};

/**
 * Abstraction layer over response-related metrics.
 * Pure interface so that it can be mocked easily.
 */
class RichResponseMetrics {
public:
  virtual ~RichResponseMetrics() = default;

  /**
   * Invoked when properly-parsed message is received.
   */
  virtual void onResponse(const int16_t api_key, const long long duration) PURE;

  /**
   * Invoked when an unknown message is received.
   */
  virtual void onUnknownResponse() PURE;

  /**
   * Invoked when a deserialization error occurs.
   */
  virtual void onBrokenResponse() PURE;
};

using RichResponseMetricsSharedPtr = std::shared_ptr<RichResponseMetrics>;

/**
 * Metrics implementation that uses Envoy Scope to store metrics.
 */
class RichResponseMetricsImpl: public RichResponseMetrics {
public:
  RichResponseMetricsImpl(Stats::Scope& scope, std::string stat_prefix): metrics_({
    KAFKA_RESPONSE_METRICS(POOL_COUNTER_PREFIX(scope, fmt::format("kafka.{}.response.",
      stat_prefix)), POOL_HISTOGRAM_PREFIX(scope, fmt::format("kafka.{}.response.", stat_prefix)))})
  {};

  void onResponse(const int16_t api_key, const long long duration) override {
    // Both successful message parsing & metrics list depend on protocol-generated code, what means
    // both do support the same api keys.
    switch (api_key) {
    {% for message_type in message_types %}
    case {{ message_type.get_extra('api_key') }} :
      // Increase received message counter and update histogram with duration.
      metrics_.{{ message_type.name_in_c_case() }}_.inc();
      metrics_.{{ message_type.name_in_c_case() }}_duration_.recordValue(duration);
      return;
    {% endfor %}
    }
  }

  void onUnknownResponse() override { metrics_.unknown_.inc(); }

  void onBrokenResponse() override { metrics_.failure_.inc(); }

private:
  KafkaResponseMetrics metrics_;
};

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
